#include "session.h"

#include <stdint.h>

#include <iostream>

#include <boost/array.hpp>
#include <boost/asio.hpp>
using namespace boost;
using namespace boost::asio;

#include <protocols/protos/common.pb.h>

#include "common/base/module.h"
#include "common/base/module_mgr.h"
#include "common/base/server_mgr.h"
#include "common/base/sub_server_mgr.h"
#include "log/log.h"
#include "net/message_parse.h"
#include "net/session_filter.h"
#include "net/session_mgr.h"
#include "utils/util.h"

Session::Session(boost::asio::io_service& ios,
                 const std::shared_ptr<SessionMgr>& sessionMgr,
                 const std::shared_ptr<ServerNetInfo>& netInfo,
                 SessionCreateType createType)
    : socket_(ios)
    , netInfo_(netInfo)
    , accept_(ios)
    , reconnTimer_(ios)
    , createType_(createType)
{
    sessionMgr_ = sessionMgr;
    msgParse_ = std::make_shared<MessageParse>();
}

Session::~Session()
{
    reconnTimer_.cancel();
}

/**
 * @desc 初始化
 * @auth Qiwei.Gu
 * @date 2015-04-04 12:51:19
 */
bool Session::initListen()
{
    ip::tcp::resolver resolver(ServerMgr::get().getIOS());
    ip::tcp::resolver::query query(netInfo_->addr, netInfo_->port);
    ip::tcp::endpoint endpoint = *resolver.resolve(query);
    accept_.open(endpoint.protocol());
    accept_.set_option(ip::tcp::acceptor::reuse_address(true));
    accept_.bind(endpoint);
    accept_.listen();

    startAccept();

    return true;
}

bool Session::initConn(bool async)
{
    LOG_DEBUG("Enter Session::initConn！");
    ip::tcp::resolver resolver(ServerMgr::get().getIOS());
    ip::tcp::resolver::query query(netInfo_->addr, netInfo_->port);
    ip::tcp::resolver::iterator iterEndpoint = resolver.resolve(query);

    if (getSocket().is_open())
    {
        const auto& localEndPoint = getSocket().local_endpoint();

        getSocket().cancel();
        getSocket().close();
        boost::system::error_code ec;
        getSocket().open(localEndPoint.protocol(), ec);
        if (ec)
        {
            LOG_ERROR("Open socket failed." << ec.message() );
            return false;
        }
        else
        {
            getSocket().set_option(ip::tcp::acceptor::reuse_address(true));
            getSocket().bind(localEndPoint);
        }
    }
    if (async)
    {
        const auto& func = std::bind(&Session::handleConn, shared_from_this(), std::placeholders::_1);
        getSocket().async_connect(*iterEndpoint, func);
    }
    else
    {
        boost::system::error_code ec;
        getSocket().connect(*iterEndpoint, ec);
        handleConn(ec);
        if (ec)
        {
            LOG_ERROR("Open socket failed." << ec.message() );
            return false;
        }

    }

    return true;
}

int64_t Session::getHandle()
{
    return handle_;
    // auto handle = socket_.native_handle();
    // return int64_t(handle);
}

void Session::updateHandle()
{
    if (!getSocket().is_open())
    {
        return;
    }
    handle_ = int64_t(socket_.native_handle());
}

void Session::startRead()
{
    if (!socket_.is_open())
    {
        LOG_ERROR("Socket not open.");
        return;
    }
    getSocket().async_read_some(msgParse_->getMutableBufs(),
                            std::bind(&Session::handleRead, shared_from_this(),
                                      std::placeholders::_1,
                                      std::placeholders::_2));
}

void Session::stop()
{
    getSocket().close();
    if (customData_)
        customData_->onSessionClose();
}

bool Session::isOpen()
{
    return socket_.is_open();
}

void Session::startAccept()
{
    const std::shared_ptr<SessionMgr>& sessionMgr = sessionMgr_.lock();
    if (!sessionMgr)
    {
        LOG_ERROR("Session manager is null.");
        return;
    }
    tmpAcceptedSession_.reset();
    tmpAcceptedSession_ = ServerMgr::get().getSubServ()->createSession(netInfo_->sessionType, sessionMgr, netInfo_, eSessionCreateAccepted);
    accept_.async_accept(tmpAcceptedSession_->getSocket(),
                         std::bind(&Session::handleAccept,
                                   tmpAcceptedSession_,
                                   std::placeholders::_1, shared_from_this()));
}

void Session::handleAccept(const boost::system::error_code& e,
                           const std::shared_ptr<Session>& accptSession)
{
    if (e)
    {
        LOG_ERROR(e.message());
        return;
    }

    // 重新开始等待新的连接
    if (accptSession)
    {
        accptSession->startAccept();
    }

    if (!getSocket().is_open())
    {
        LOG_ERROR("Socket not open.");
        return;
    }

    LOG_DEBUG("Socket accepted: server type:" << netInfo_->serverType
              << " remote addr:" << getSocket().remote_endpoint()
              << " local addr:" << getSocket().local_endpoint());

    const std::shared_ptr<SessionMgr>& sessionMgr = sessionMgr_.lock();
    if (!sessionMgr)
    {
        LOG_ERROR("Session manager is null.");
        return;
    }

    updateHandle();
    sessionMgr->addSession(shared_from_this());

    startRead();
}

/**
 * @desc 处理异步连接
 * @auth Qiwei.Gu
 * @date 2015-04-04 14:00:05
 */
void Session::handleConn(const boost::system::error_code& e)
{
    const std::shared_ptr<SessionMgr>& sessionMgr = sessionMgr_.lock();
    if (!sessionMgr)
    {
        LOG_ERROR("Session manager is null.");
        return;
    }

    // ParseResultState result = ParseResultState::eFailed;
    if (e && e != boost::asio::error::operation_aborted)
    {
        sessionMgr->removeSession(shared_from_this());
        if (getSocket().is_open())
        {
            LOG_ERROR(e.message() << " server type:" << netInfo_->serverType
                      << " local addr:" << getSocket().local_endpoint()
                      << " remote addr:" << netInfo_->addr << ":" << netInfo_->port);
        }
        else
        {
            LOG_ERROR(e.message() << " server type:" << netInfo_->serverType
                      << " remote addr:" << netInfo_->addr << ":" << netInfo_->port);
        }

        reconnTimer_.expires_from_now(boost::posix_time::millisec(netInfo_->reconnInterval));
        reconnTimer_.async_wait(std::bind(&Session::initConn, shared_from_this(), false));
        return;
    }

    if (getSocket().is_open())
    {
        LOG_DEBUG("Socket connected: server type:" << netInfo_->serverType
                  << " session: " << netInfo_->sessionType
                  << " id: " << netInfo_->serverID
                  << " local addr:" << getSocket().local_endpoint()
                  << " remote addr:" << netInfo_->addr << ":" << netInfo_->port
                  << " ptr: " << std::hex << (uint64_t(this)));
    }

    getSocket().set_option(ip::tcp::acceptor::reuse_address(true));
    reconnTimer_.cancel();
    updateHandle();

    sessionMgr->addSession(shared_from_this());

    startRead();

    return;
}


/**
 * @desc 处理读数据错误
 * @auth Qiwei.Gu
 * @date 2015-04-22 17:28:28
 */
void Session::handleReadError(const boost::system::error_code& e)
{
    auto sessionMgr = sessionMgr_.lock();
    if (!sessionMgr)
    {
        LOG_ERROR("Sesion manager not found.");
        return;
    }
    stop();
    sessionMgr->removeSession(shared_from_this());
    LOG_ERROR(e.category().name() << "(" << e.value() << "):" << e.message());

    if (getCreateType() == eSessionCreateConn)
    {
        reconnTimer_.expires_from_now(boost::posix_time::millisec(netInfo_->reconnInterval));
        reconnTimer_.async_wait(std::bind(&Session::initConn, shared_from_this(), false));
    }
}

void Session::handleRead(const boost::system::error_code& e,
                         std::size_t transSize)
{
    LOG_DEBUG("recv data, from:" << std::hex << getNetInfo()->getID()
              << " create type: " << getCreateType() << "" << transSize);
    auto sessionMgr = sessionMgr_.lock();
    if (!sessionMgr)
    {
        LOG_ERROR("Sesion manager not found.");
        return;
    }
    // for (auto& it : sessionMgr->getFilters())
    // {
    //     auto result = it->handleReadFilter(shared_from_this(), e, transSize);
    //     switch (result)
    //     {
    //     case eFilterSucContinue:
    //     {
    //         continue;
    //     }
    //     case eFilterSucSkipOthers:
    //     {
    //         break;
    //     }
    //     case eFilterSucReturn:
    //     {
    //         return;
    //     }
    //     default:
    //     {
    //         LOG_ERROR("过滤器检测失败");
    //         return handleReadError(e);
    //     }
    //     }
    // }

    ParseResultState result = ParseResultState::eFailed;

    if (e && e != boost::asio::error::operation_aborted)
    {
        return handleReadError(e);
    }

    result = msgParse_->parse(transSize);
    if (result == ParseResultState::eSuc)
    {
        auto header = msgParse_->getMsgHeader();
        auto& bufs = msgParse_->getConstBufs();
        if (!dispatchMessage(header.msgID,
                             buffer_cast<const char*>(bufs.data()),
                             header.bufLen))
        {
            LOG_ERROR("Deal msg failed. id:" << std::hex << header.msgID.msgID);
            // goto failed;
        }
        std::size_t oldSize = bufs.size();
        bufs.consume(bufs.size());
        LOG_DEBUG("Buffer consume size: " << header.bufLen
                  << " old size: " << oldSize
                  << " now size: " << bufs.size());
        return startRead();
    }
    else if (result == ParseResultState::eNeedMore)
    {
        return startRead();
    }
    else
    {
        LOG_ERROR("parse msg failed.");
        return handleReadError(e);
    }
    return;

}

/**
 * @desc 向对方发送数据
 * @param buf 数据
 * @param size buf 长度
 * @auth Qiwei.Gu
 * @date 2015-04-12 01:47:09
 */
bool Session::sendData(const char* buf, std::size_t size)
{
    writeBuffer_.sputn(buf, size);
    getSocket().async_send(writeBuffer_.data(),
                           std::bind(&Session::handleSend, this,
                                     std::placeholders::_1,
                                     std::placeholders::_2));
    return true;
}


/**
 * @desc 向对方发送消息
 * @param msgID 消息ID
 * @param msg 消息内容
 * @auth Qiwei.Gu
 * @date 2015-04-12 01:47:09
 */
bool Session::sendMsg(uint16_t moduleType, uint16_t msgID, google::protobuf::Message& msg)
{
    return sendMsg(MessageID(moduleType, msgID), msg);
}

bool Session::sendMsg(uint16_t moduleType, uint16_t msgID, const char* buf, std::size_t bufSize)
{
    return sendMsg(MessageID(moduleType, msgID), buf, bufSize);
}

bool Session::sendMsg(const MessageID& msgID, google::protobuf::Message& msg)
{
    std::string buf = msg.SerializeAsString();
    return sendMsg(msgID, buf.c_str(), buf.size());
}

bool Session::sendMsg(const MessageID& msgID, const char* buf, std::size_t bufSize)
{
    uint16_t tag = htons(0xDDEE);
    writeBuffer_.sputn((char*)&tag, sizeof(tag));

    uint64_t id = hton64(msgID.msgID);
    writeBuffer_.sputn((char*)&id, sizeof(id));

    uint16_t msgSize = htons(bufSize);
    writeBuffer_.sputn((char*)&msgSize, sizeof(uint16_t));

    if ((writeBuffer_.size() + bufSize) >= MAX_WIRTE_BUFFER_SIZE)
    {
        LOG_ERROR("Send msg buffer, too large. msgID:" << std::hex << msgID.msgID);
        return false;
    }

    writeBuffer_.sputn(buf, bufSize);

    getSocket().async_send(writeBuffer_.data(),
                           std::bind(&Session::handleSend, this,
                                     std::placeholders::_1,
                                     std::placeholders::_2));

    return true;
}

void Session::handleSend(const boost::system::error_code& e,
                         std::size_t transSize)
{
    if (e)
    {
        LOG_ERROR(e.message());
        return;
    }
    LOG_DEBUG("Send data to : (remote:" <<  getSocket().remote_endpoint()
              << " local: " << getSocket().local_endpoint() << ")"
              << " create type:" << getCreateType()
              << " buffer size:" << (int)transSize);
    writeBuffer_.consume(transSize);
    LOG_DEBUG("Send over, now size:" << writeBuffer_.size());
}

bool Session::dispatchMessage(const MessageID& msgID, const char* buf, std::size_t bufSize)
{
    auto moduleMgr = ServerMgr::get().getMgr<ModuleMgr>(ManagerType::eModule);
    if (!moduleMgr)
    {
        LOG_ERROR("No found module manager.");
        return false;
    }
    LOG_DEBUG("Recv msg id: " << std::hex << msgID.msgID << std::dec << " size:" << bufSize);
    auto module = moduleMgr->getModule(msgID.stMsg.moduleType);
    if (!module || !module->doDispatch(shared_from_this(), msgID, buf, bufSize))
    {
        if (!module)
        {
            LOG_ERROR("module not found, type:" << msgID.stMsg.moduleType);
        }
        module = moduleMgr->getModule(proto::common::eCommon);
        if (module)
        {
            LOG_DEBUG("trans msg id: " << std::hex << msgID.msgID << std::dec << " size:" << bufSize );
            return module->doTranslate(shared_from_this(), msgID, buf, bufSize);
        }
        LOG_ERROR("No found module " << (uint16_t)msgID.stMsg.moduleType) ;
        return false;
    }
    return true;
}

bool Session::dispatchMessage(const MessageID& msgID, const google::protobuf::Message& msg)
{
    boost::asio::streambuf buf;
    std::ostream os(&buf);
    msg.SerializeToOstream(&os);
    return dispatchMessage(msgID, buffer_cast<const char*>(buf.data()), buffer_size(buf.data()));
}
